| Тип | Описание | Пример |
|---|---|---|
| Inductive | Разные задачи, может быть тот же домен | ImageNet → медицина |
| Transductive | Та же задача, разные домены | Отзывы книг → фильмы |
| Unsupervised | Нет меток в обоих доменах | Кластеризация cross-domain |
Перенос представлений признаков:
from sklearn.ensemble import RandomForestClassifier
from sklearn.decomposition import PCA
# Source domain - большой датасет
rf_source = RandomForestClassifier(n_estimators=100)
rf_source.fit(X_source, y_source)
# Извлечение признаков (листья деревьев как признаки)
X_source_features = rf_source.apply(X_source)
X_target_features = rf_source.apply(X_target)
# PCA на source
pca = PCA(n_components=50)
pca.fit(X_source_features)
# Применение к target
X_target_transformed = pca.transform(X_target_features)
# Обучение на новых признаках
model_target = RandomForestClassifier()
model_target.fit(X_target_transformed, y_target)
Взвешивание примеров из source domain:
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
# Оценка важности каждого примера из source
def importance_weighting(X_source, X_target):
# Обучаем классификатор различать домены
from sklearn.linear_model import LogisticRegression
# Метки: 0 = source, 1 = target
X_combined = np.vstack([X_source, X_target])
y_domain = np.hstack([
np.zeros(len(X_source)),
np.ones(len(X_target))
])
domain_clf = LogisticRegression()
domain_clf.fit(X_combined, y_domain)
# Вероятность принадлежности target
proba_target = domain_clf.predict_proba(X_source)[:, 1]
proba_source = domain_clf.predict_proba(X_source)[:, 0]
# Веса = P(target) / P(source)
weights = proba_target / (proba_source + 1e-10)
return weights
weights = importance_weighting(X_source, X_target)
# Обучение с весами
model = GradientBoostingClassifier()
model.fit(X_source, y_source, sample_weight=weights)
# Начинаем с параметров source модели
from sklearn.linear_model import LogisticRegression
# Модель на source
model_source = LogisticRegression(max_iter=1000)
model_source.fit(X_source, y_source)
# Инициализация target модели параметрами source
model_target = LogisticRegression(
max_iter=1000,
warm_start=True
)
# Установка начальных весов
model_target.coef_ = model_source.coef_.copy()
model_target.intercept_ = model_source.intercept_.copy()
model_target.classes_ = model_source.classes_
# Fine-tuning на target
model_target.fit(X_target, y_target)
# Или частичное обновление
from sklearn.linear_model import SGDClassifier
sgd = SGDClassifier(warm_start=True)
sgd.coef_ = model_source.coef_.copy()
sgd.intercept_ = model_source.intercept_.copy()
# Постепенное дообучение
sgd.partial_fit(X_target, y_target, classes=np.unique(y_target))
Минимизация domain shift:
# Maximum Mean Discrepancy (MMD)
def mmd_loss(X_source, X_target, kernel='rbf', gamma=1.0):
from sklearn.metrics.pairwise import rbf_kernel
n_source = len(X_source)
n_target = len(X_target)
# Kernel matrices
K_ss = rbf_kernel(X_source, X_source, gamma)
K_tt = rbf_kernel(X_target, X_target, gamma)
K_st = rbf_kernel(X_source, X_target, gamma)
# MMD^2
mmd = (K_ss.sum() / (n_source ** 2) +
K_tt.sum() / (n_target ** 2) -
2 * K_st.sum() / (n_source * n_target))
return mmd
# Feature alignment
from sklearn.preprocessing import StandardScaler
scaler_source = StandardScaler()
X_source_scaled = scaler_source.fit_transform(X_source)
scaler_target = StandardScaler()
X_target_scaled = scaler_target.fit_transform(X_target)
# Align distributions
from scipy.stats import wasserstein_distance
def align_features(X_source, X_target):
X_aligned = X_target.copy()
for i in range(X_target.shape[1]):
# Match moments
mean_s, std_s = X_source[:, i].mean(), X_source[:, i].std()
mean_t, std_t = X_target[:, i].mean(), X_target[:, i].std()
X_aligned[:, i] = (X_target[:, i] - mean_t) / std_t * std_s + mean_s
return X_aligned
X_target_aligned = align_features(X_source_scaled, X_target_scaled)
Transfer AdaBoost для domain adaptation:
from sklearn.tree import DecisionTreeClassifier
import numpy as np
class TrAdaBoost:
def __init__(self, n_estimators=10):
self.n_estimators = n_estimators
self.models = []
self.alphas = []
def fit(self, X_source, y_source, X_target, y_target):
n_source = len(X_source)
n_target = len(X_target)
# Объединение данных
X_all = np.vstack([X_source, X_target])
y_all = np.hstack([y_source, y_target])
# Начальные веса
weights = np.ones(n_source + n_target)
weights[:n_source] = 1.0 / n_source
weights[n_source:] = 1.0 / n_target
for t in range(self.n_estimators):
# Обучение слабого классификатора
clf = DecisionTreeClassifier(max_depth=3)
clf.fit(X_all, y_all, sample_weight=weights)
# Предсказания
y_pred = clf.predict(X_all)
errors = (y_pred != y_all).astype(int)
# Ошибка на target
error_target = (weights[n_source:] * errors[n_source:]).sum()
if error_target > 0.5 or error_target == 0:
break
# Альфа
beta = error_target / (1 - error_target)
alpha = np.log(1 / beta) / 2
# Обновление весов
# Source: уменьшаем вес при ошибке
weights[:n_source] *= np.power(beta, -errors[:n_source])
# Target: увеличиваем вес при ошибке
weights[n_source:] *= np.power(beta, errors[n_source:])
# Нормализация
weights /= weights.sum()
self.models.append(clf)
self.alphas.append(alpha)
def predict(self, X):
predictions = np.zeros((len(X), len(self.models)))
for i, (model, alpha) in enumerate(zip(self.models, self.alphas)):
predictions[:, i] = model.predict(X) * alpha
return (predictions.sum(axis=1) > 0).astype(int)
# Использование
tradaboost = TrAdaBoost(n_estimators=10)
tradaboost.fit(X_source, y_source, X_target, y_target)
y_pred = tradaboost.predict(X_test)
from sklearn.ensemble import RandomForestClassifier
import numpy as np
def self_training_transfer(X_source, y_source, X_target,
confidence_threshold=0.9, max_iter=10):
# Начальная модель на source
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_source, y_source)
# Unlabeled target data
X_unlabeled = X_target.copy()
X_labeled = X_source.copy()
y_labeled = y_source.copy()
for iteration in range(max_iter):
# Предсказания на unlabeled
proba = model.predict_proba(X_unlabeled)
max_proba = proba.max(axis=1)
# Высокоуверенные предсказания
confident_mask = max_proba >= confidence_threshold
if confident_mask.sum() == 0:
break
# Добавление к labeled
X_new = X_unlabeled[confident_mask]
y_new = model.predict(X_new)
X_labeled = np.vstack([X_labeled, X_new])
y_labeled = np.hstack([y_labeled, y_new])
# Удаление из unlabeled
X_unlabeled = X_unlabeled[~confident_mask]
# Переобучение
model.fit(X_labeled, y_labeled)
print(f"Iteration {iteration+1}: added {confident_mask.sum()} samples")
return model
model = self_training_transfer(X_source, y_source, X_target)
Одновременное обучение на нескольких задачах:
from sklearn.multioutput import MultiOutputClassifier
from sklearn.ensemble import RandomForestClassifier
# Несколько связанных задач
# y_task1, y_task2 - разные задачи
y_multi = np.column_stack([y_task1, y_task2])
# Multi-task модель
model = MultiOutputClassifier(
RandomForestClassifier(n_estimators=100, random_state=42)
)
model.fit(X, y_multi)
# Предсказания для обеих задач
predictions = model.predict(X_test)
pred_task1 = predictions[:, 0]
pred_task2 = predictions[:, 1]
# Или с разделяемыми параметрами
from sklearn.linear_model import MultiTaskLasso
# Lasso с общими признаками
mtl = MultiTaskLasso(alpha=0.1)
mtl.fit(X_train, y_multi_train)
# Коэффициенты показывают общие признаки
shared_features = (mtl.coef_[:, :] != 0).sum(axis=0) > 1
print(f"Shared features: {shared_features.sum()}")
from sklearn.metrics import accuracy_score
# A-distance - мера различия доменов
def a_distance(X_source, X_target):
from sklearn.svm import LinearSVC
X_combined = np.vstack([X_source, X_target])
y_domain = np.hstack([
np.zeros(len(X_source)),
np.ones(len(X_target))
])
clf = LinearSVC()
from sklearn.model_selection import cross_val_score
scores = cross_val_score(clf, X_combined, y_domain, cv=5)
# A-distance ≈ 2(1 - 2*error)
error = 1 - scores.mean()
a_dist = 2 * (1 - 2 * error)
return a_dist
# Transfer ratio
def transfer_ratio(source_perf, target_perf, baseline_perf):
# Насколько transfer помог по сравнению с baseline
return (target_perf - baseline_perf) / (source_perf - baseline_perf)
# Negative transfer detection
baseline_acc = accuracy_score(y_test, model_baseline.predict(X_test))
transfer_acc = accuracy_score(y_test, model_transfer.predict(X_test))
if transfer_acc < baseline_acc:
print("Warning: Negative transfer detected!")
print(f"Baseline: {baseline_acc:.3f}, Transfer: {transfer_acc:.3f}")
# Пример с scikit-learn SGD
from sklearn.linear_model import SGDClassifier
# Начальная модель
source_model = SGDClassifier(max_iter=1000, random_state=42)
source_model.fit(X_source, y_source)
# Fine-tuning с меньшим learning rate
target_model = SGDClassifier(
max_iter=100,
learning_rate='constant',
eta0=0.001, # меньше чем обычно
warm_start=True,
random_state=42
)
# Копирование весов
target_model.coef_ = source_model.coef_.copy()
target_model.intercept_ = source_model.intercept_.copy()
target_model.classes_ = source_model.classes_
# Дообучение
target_model.fit(X_target, y_target)
«Transfer Learning — это как использование опыта врача из одной специальности в смежной области. Модель, обученная на большом датасете, уже знает общие паттерны, и нам нужно лишь адаптировать эти знания под нашу конкретную задачу, экономя время и данные».